Overview
The fetch_company_filings.py script retrieves regulatory filings (annual reports, quarterly results, compliance filings) for each stock from two separate API endpoints and intelligently merges them to maximize data coverage. This hybrid approach ensures comprehensive filing coverage.
Purpose
Fetches company regulatory filings including:
- Annual Reports
- Quarterly Results (PDF filings)
- Board Meeting Intimations
- SEBI Reg 7(2) Insider Trading disclosures
- Corporate Governance Reports
- LODR (Listing Obligations and Disclosure Requirements) filings
API Endpoints
Endpoint 1: Legacy Company Filings
https://ow-static-scanx.dhan.co/staticscanx/company_filings
Endpoint 2: LODR Filings
https://ow-static-scanx.dhan.co/staticscanx/lodr
Request Payload
{
"data": {
"isin": "<ISIN>",
"pg_no": 1,
"count": 100
}
}
Parameters
ISIN code of the security
Page number for pagination
Number of filings to retrieve (maximum tested: 100)
Output Files
company_filings/{SYMBOL}_filings.json
Per-stock filing data with structure:{
"code": 0,
"data": [
{
"news_id": "unique_id",
"news_date": "2024-01-15",
"caption": "Annual Report 2023-24",
"descriptor": "Financial Results",
"file_url": "https://www.bseindia.com/..."
}
]
}
Sorted by news_date (descending - latest first). Deduplicated by news_id + news_date + caption.
Function Signature
def fetch_filings(item):
"""
Fetches filings for a single stock from both endpoints and merges.
Args:
item (dict): Stock object with 'Symbol' and 'ISIN' keys
Returns:
str: Status - "success", "skipped", or "empty"
Process:
1. Check if filing exists and FORCE_UPDATE flag
2. Fetch from /company_filings endpoint
3. Fetch from /lodr endpoint
4. Merge and deduplicate by (news_id, date, caption)
5. Sort by date descending
6. Save to company_filings/{SYMBOL}_filings.json
"""
Dependencies
requests - HTTP client
json - JSON processing
os - File operations
time - Performance tracking
concurrent.futures.ThreadPoolExecutor - Parallel execution
pipeline_utils.BASE_DIR - Base directory path
pipeline_utils.get_headers() - Standard API headers
master_isin_map.json - ISIN to Symbol mapping
Threading Configuration
Number of concurrent threads for parallel fetching
If true, refreshes all filings even if file exists. Set to false to skip existing files.
Code Example
import json
import requests
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from pipeline_utils import BASE_DIR, get_headers
INPUT_FILE = os.path.join(BASE_DIR, "master_isin_map.json")
OUTPUT_DIR = os.path.join(BASE_DIR, "company_filings")
MAX_THREADS = 20
def fetch_filings(item):
symbol = item.get("Symbol")
isin = item.get("ISIN")
if not symbol or not isin:
return None
output_path = os.path.join(OUTPUT_DIR, f"{symbol}_filings.json")
headers = get_headers()
# Fetch from Endpoint 1 (/company_filings)
url1 = "https://ow-static-scanx.dhan.co/staticscanx/company_filings"
data1 = []
try:
payload1 = {"data": {"isin": isin, "pg_no": 1, "count": 100}}
res1 = requests.post(url1, json=payload1, headers=headers, timeout=10)
if res1.status_code == 200:
data1 = res1.json().get("data", []) or []
except:
pass
# Fetch from Endpoint 2 (/lodr)
url2 = "https://ow-static-scanx.dhan.co/staticscanx/lodr"
data2 = []
try:
payload2 = {"data": {"isin": isin, "pg_no": 1, "count": 100}}
res2 = requests.post(url2, json=payload2, headers=headers, timeout=10)
if res2.status_code == 200:
data2 = res2.json().get("data", []) or []
except:
pass
# Merge & Deduplicate
combined = data1 + data2
unique_map = {}
for entry in combined:
nid = entry.get("news_id")
date_str = entry.get("news_date")
caption = entry.get("caption") or entry.get("descriptor") or "Unknown"
# Create unique key
key = nid if nid else f"{date_str}_{caption}"
if key not in unique_map:
unique_map[key] = entry
elif entry.get("file_url") and not unique_map[key].get("file_url"):
unique_map[key] = entry
final_list = list(unique_map.values())
final_list.sort(key=lambda x: x.get("news_date", "1900-01-01"), reverse=True)
if not final_list:
return "empty"
wrapped_data = {"code": 0, "data": final_list}
with open(output_path, "w") as f:
json.dump(wrapped_data, f, indent=4)
return "success"
def main():
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
with open(INPUT_FILE, "r") as f:
stock_list = json.load(f)
total = len(stock_list)
print(f"Starting Filing Fetch (Threads: {MAX_THREADS}) for {total} stocks...")
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
future_to_stock = {executor.submit(fetch_filings, item): item["Symbol"] for item in stock_list}
for future in as_completed(future_to_stock):
result = future.result()
# Handle result
Usage
python3 fetch_company_filings.py
- Execution Time: ~3-5 minutes for 2,775 stocks
- API Calls: 5,550 requests (2 endpoints × 2,775 stocks)
- Output: 2,775 individual JSON files in
company_filings/ directory
- Concurrency: 20 parallel threads
- Deduplication: By
news_id + news_date + caption
Deduplication Logic
- Fetches from both endpoints for each stock
- Combines results into a single array
- Creates unique key using:
news_id (if available), OR
{news_date}_{caption} combination
- Keeps first occurrence unless duplicate has
file_url and original doesn’t
- Sorts final list by date (newest first)
Notes
- Hybrid approach ensures maximum filing coverage by querying two separate databases
- Automatically creates
company_filings/ directory if it doesn’t exist
- Set
FORCE_UPDATE = False to skip re-fetching existing files (useful for incremental updates)
- 10-second timeout per request to handle slow responses